{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Copyright (c) Microsoft Corporation. All rights reserved.\n",
        "\n",
        "Licensed under the MIT License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/pipeline-style-transfer/pipeline-style-transfer.png)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Neural style transfer on video\n",
        "Using modified code from `pytorch`'s neural style [example](https://pytorch.org/tutorials/advanced/neural_style_tutorial.html), we show how to setup a pipeline for doing style transfer on video. The pipeline has following steps:\n",
        "1. Split a video into images\n",
        "2. Run neural style on each image using one of the provided models (from `pytorch` pretrained models for this example).\n",
        "3. Stitch the image back into a video.\n",
        "\n",
        "> **Note**\n",
        "This notebook uses public preview functionality (ParallelRunStep). Please install azureml-contrib-pipeline-steps package before running this notebook.\n",
        "```\n",
        "pip install azureml-contrib-pipeline-steps\n",
        "```"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Prerequisites\n",
        "If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the configuration Notebook located at https://github.com/Azure/MachineLearningNotebooks first if you haven't. This sets you up with a working config file that has information on your workspace, subscription id, etc. "
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Initialize Workspace\n",
        "\n",
        "Initialize a workspace object from persisted configuration."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Check core SDK version number\n",
        "import azureml.core\n",
        "\n",
        "print(\"SDK version:\", azureml.core.VERSION)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core import Workspace, Experiment\n",
        "\n",
        "ws = Workspace.from_config()\n",
        "print('Workspace name: ' + ws.name, \n",
        "      'Azure region: ' + ws.location, \n",
        "      'Subscription id: ' + ws.subscription_id, \n",
        "      'Resource group: ' + ws.resource_group, sep = '\\n')"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core.compute import AmlCompute, ComputeTarget\n",
        "from azureml.core.datastore import Datastore\n",
        "from azureml.data.data_reference import DataReference\n",
        "from azureml.pipeline.core import Pipeline, PipelineData\n",
        "from azureml.pipeline.steps import PythonScriptStep\n",
        "from azureml.core.runconfig import CondaDependencies, RunConfiguration\n",
        "from azureml.core.compute_target import ComputeTargetException"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Download models"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "import os\n",
        "\n",
        "# create directory for model\n",
        "model_dir = 'models'\n",
        "if not os.path.isdir(model_dir):\n",
        "    os.mkdir(model_dir)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "import urllib.request\n",
        "\n",
        "def download_model(model_name):\n",
        "    # downloaded models from https://pytorch.org/tutorials/advanced/neural_style_tutorial.html are kept here\n",
        "    url=\"https://pipelinedata.blob.core.windows.net/styletransfer/saved_models/\" + model_name\n",
        "    local_path = os.path.join(model_dir, model_name)\n",
        "    urllib.request.urlretrieve(url, local_path)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Register all Models"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core.model import Model\n",
        "mosaic_model = None\n",
        "candy_model = None\n",
        "\n",
        "models = Model.list(workspace=ws, tags=['scenario'])\n",
        "for m in models:\n",
        "    print(\"Name:\", m.name,\"\\tVersion:\", m.version, \"\\tDescription:\", m.description, m.tags)\n",
        "    if m.name == 'mosaic' and mosaic_model is None:\n",
        "        mosaic_model = m\n",
        "    elif m.name == 'candy' and candy_model is None:\n",
        "        candy_model = m\n",
        "\n",
        "if mosaic_model is None:\n",
        "    print('Mosaic model does not exist, registering it')\n",
        "    download_model('mosaic.pth')\n",
        "    mosaic_model = Model.register(model_path = os.path.join(model_dir, \"mosaic.pth\"),\n",
        "                       model_name = \"mosaic\",\n",
        "                       tags = {'type': \"mosaic\", 'scenario': \"Style transfer using batch inference\"},\n",
        "                       description = \"Style transfer - Mosaic\",\n",
        "                       workspace = ws)\n",
        "else:\n",
        "    print('Reusing existing mosaic model')\n",
        "    \n",
        "\n",
        "if candy_model is None:\n",
        "    print('Candy model does not exist, registering it')\n",
        "    download_model('candy.pth')\n",
        "    candy_model = Model.register(model_path = os.path.join(model_dir, \"candy.pth\"),\n",
        "                       model_name = \"candy\",\n",
        "                       tags = {'type': \"candy\", 'scenario': \"Style transfer using batch inference\"},\n",
        "                       description = \"Style transfer - Candy\",\n",
        "                       workspace = ws)\n",
        "else:\n",
        "    print('Reusing existing candy model')"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Create or use existing compute"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# AmlCompute\n",
        "cpu_cluster_name = \"cpu-cluster\"\n",
        "try:\n",
        "    cpu_cluster = AmlCompute(ws, cpu_cluster_name)\n",
        "    print(\"found existing cluster.\")\n",
        "except ComputeTargetException:\n",
        "    print(\"creating new cluster\")\n",
        "    provisioning_config = AmlCompute.provisioning_configuration(vm_size = \"STANDARD_D2_v2\",\n",
        "                                                                    max_nodes = 1)\n",
        "\n",
        "    # create the cluster\n",
        "    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, provisioning_config)\n",
        "    cpu_cluster.wait_for_completion(show_output=True)\n",
        "    \n",
        "# AmlCompute\n",
        "gpu_cluster_name = \"gpu-cluster\"\n",
        "try:\n",
        "    gpu_cluster = AmlCompute(ws, gpu_cluster_name)\n",
        "    print(\"found existing cluster.\")\n",
        "except ComputeTargetException:\n",
        "    print(\"creating new cluster\")\n",
        "    provisioning_config = AmlCompute.provisioning_configuration(vm_size = \"STANDARD_NC6\",\n",
        "                                                                max_nodes = 3)\n",
        "\n",
        "    # create the cluster\n",
        "    gpu_cluster = ComputeTarget.create(ws, gpu_cluster_name, provisioning_config)\n",
        "    gpu_cluster.wait_for_completion(show_output=True)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Python Scripts\n",
        "We use an edited version of `neural_style_mpi.py` (original is [here](https://github.com/pytorch/examples/blob/master/fast_neural_style/neural_style/neural_style.py)). Scripts to split and stitch the video are thin wrappers to calls to `ffmpeg`. \n",
        "\n",
        "We install `ffmpeg` through conda dependencies."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "scripts_folder = \"scripts\""
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "process_video_script_file = \"process_video.py\"\n",
        "\n",
        "# peek at contents\n",
        "with open(os.path.join(scripts_folder, process_video_script_file)) as process_video_file:\n",
        "    print(process_video_file.read())"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "stitch_video_script_file = \"stitch_video.py\"\n",
        "\n",
        "# peek at contents\n",
        "with open(os.path.join(scripts_folder, stitch_video_script_file)) as stitch_video_file:\n",
        "    print(stitch_video_file.read())"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "The sample video **organutan.mp4** is stored at a publicly shared datastore. We are registering the datastore below. If you want to take a look at the original video, click here. (https://pipelinedata.blob.core.windows.net/sample-videos/orangutan.mp4)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# datastore for input video\n",
        "account_name = \"pipelinedata\"\n",
        "video_ds = Datastore.register_azure_blob_container(ws, \"videos\", \"sample-videos\",\n",
        "                                            account_name=account_name, overwrite=True)\n",
        "\n",
        "# the default blob store attached to a workspace\n",
        "default_datastore = ws.get_default_datastore()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Sample video"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "video_name=os.getenv(\"STYLE_TRANSFER_VIDEO_NAME\", \"orangutan.mp4\") \n",
        "orangutan_video = DataReference(datastore=video_ds,\n",
        "                            data_reference_name=\"video\",\n",
        "                            path_on_datastore=video_name, mode=\"download\")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "cd = CondaDependencies()\n",
        "\n",
        "cd.add_channel(\"conda-forge\")\n",
        "cd.add_conda_package(\"ffmpeg\")\n",
        "\n",
        "# Runconfig\n",
        "amlcompute_run_config = RunConfiguration(conda_dependencies=cd)\n",
        "amlcompute_run_config.environment.docker.base_image = \"pytorch/pytorch\"\n",
        "amlcompute_run_config.environment.spark.precache_packages = False"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "ffmpeg_audio = PipelineData(name=\"ffmpeg_audio\", datastore=default_datastore)\n",
        "processed_images = PipelineData(name=\"processed_images\", datastore=default_datastore)\n",
        "output_video = PipelineData(name=\"output_video\", datastore=default_datastore)\n",
        "\n",
        "ffmpeg_images_ds_name = \"ffmpeg_images_data\"\n",
        "ffmpeg_images = PipelineData(name=\"ffmpeg_images\", datastore=default_datastore)\n",
        "ffmpeg_images_file_dataset = ffmpeg_images.as_dataset()\n",
        "ffmpeg_images_named_file_dataset = ffmpeg_images_file_dataset.as_named_input(ffmpeg_images_ds_name)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Define tweakable parameters to pipeline\n",
        "These parameters can be changed when the pipeline is published and rerun from a REST call.\n",
        "As part of ParallelRunStep following 2 pipeline parameters will be created which can be used to override values.\n",
        "    node_count\n",
        "    process_count_per_node"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.pipeline.core.graph import PipelineParameter\n",
        "# create a parameter for style (one of \"candy\", \"mosaic\") to transfer the images to\n",
        "style_param = PipelineParameter(name=\"style\", default_value=\"mosaic\")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "split_video_step = PythonScriptStep(\n",
        "    name=\"split video\",\n",
        "    script_name=\"process_video.py\",\n",
        "    arguments=[\"--input_video\", orangutan_video,\n",
        "               \"--output_audio\", ffmpeg_audio,\n",
        "               \"--output_images\", ffmpeg_images,\n",
        "              ],\n",
        "    compute_target=cpu_cluster,\n",
        "    inputs=[orangutan_video],\n",
        "    outputs=[ffmpeg_images, ffmpeg_audio],\n",
        "    runconfig=amlcompute_run_config,\n",
        "    source_directory=scripts_folder\n",
        ")\n",
        "\n",
        "stitch_video_step = PythonScriptStep(\n",
        "    name=\"stitch\",\n",
        "    script_name=\"stitch_video.py\",\n",
        "    arguments=[\"--images_dir\", processed_images, \n",
        "               \"--input_audio\", ffmpeg_audio, \n",
        "               \"--output_dir\", output_video],\n",
        "    compute_target=cpu_cluster,\n",
        "    inputs=[processed_images, ffmpeg_audio],\n",
        "    outputs=[output_video],\n",
        "    runconfig=amlcompute_run_config,\n",
        "    source_directory=scripts_folder\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Create environment, parallel step run config and parallel run step"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core import Environment\n",
        "from azureml.core.runconfig import DEFAULT_GPU_IMAGE\n",
        "\n",
        "parallel_cd = CondaDependencies()\n",
        "\n",
        "parallel_cd.add_channel(\"pytorch\")\n",
        "parallel_cd.add_conda_package(\"pytorch\")\n",
        "parallel_cd.add_conda_package(\"torchvision\")\n",
        "\n",
        "styleenvironment = Environment(name=\"styleenvironment\")\n",
        "styleenvironment.python.conda_dependencies=parallel_cd\n",
        "styleenvironment.docker.base_image = DEFAULT_GPU_IMAGE"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.contrib.pipeline.steps import ParallelRunConfig\n",
        "\n",
        "parallel_run_config = ParallelRunConfig(\n",
        "                    environment=styleenvironment,\n",
        "                    entry_script='transform.py',\n",
        "                    output_action='summary_only',\n",
        "                    mini_batch_size=\"1\",\n",
        "                    error_threshold=1,\n",
        "                    source_directory=scripts_folder,\n",
        "                    compute_target=gpu_cluster, \n",
        "                    node_count=3)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.contrib.pipeline.steps import ParallelRunStep\n",
        "from datetime import datetime\n",
        "\n",
        "parallel_step_name = 'styletransfer-' + datetime.now().strftime('%Y%m%d%H%M')\n",
        "\n",
        "distributed_style_transfer_step = ParallelRunStep(\n",
        "    name=parallel_step_name,\n",
        "    inputs=[ffmpeg_images_named_file_dataset], # Input file share/blob container/file dataset\n",
        "    output=processed_images,  # Output file share/blob container\n",
        "    models=[mosaic_model, candy_model],\n",
        "    tags = {'scenario': \"batch inference\", 'type': \"demo\"},\n",
        "    properties = {'area': \"style transfer\"},\n",
        "    arguments=[\"--style\", style_param],\n",
        "    parallel_run_config=parallel_run_config,\n",
        "    allow_reuse=True #[optional - default value True]\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Run the pipeline"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline = Pipeline(workspace=ws, steps=[stitch_video_step])\n",
        "\n",
        "pipeline.validate()"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# submit the pipeline and provide values for the PipelineParameters used in the pipeline\n",
        "pipeline_run = Experiment(ws, 'styletransfer_parallel_mosaic').submit(pipeline)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Monitor using widget"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Track pipeline run progress\n",
        "from azureml.widgets import RunDetails\n",
        "RunDetails(pipeline_run).show()"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline_run.wait_for_completion()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Downloads the video in `output_video` folder"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Download output video"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "def download_video(run, target_dir=None):\n",
        "    stitch_run = run.find_step_run(\"stitch\")[0]\n",
        "    port_data = stitch_run.get_output_data(\"output_video\")\n",
        "    port_data.download(target_dir, show_progress=True)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline_run.wait_for_completion()\n",
        "download_video(pipeline_run, \"output_video_mosaic\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Publish pipeline"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline_name = \"style-transfer-batch-inference\"\n",
        "print(pipeline_name)\n",
        "\n",
        "published_pipeline = pipeline.publish(\n",
        "    name=pipeline_name, \n",
        "    description=pipeline_name)\n",
        "print(\"Newly published pipeline id: {}\".format(published_pipeline.id))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Get published pipeline\n",
        "This is another way to get the published pipeline."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.pipeline.core import PublishedPipeline\n",
        "\n",
        "# You could retrieve all pipelines that are published, or \n",
        "# just get the published pipeline object that you have the ID for.\n",
        "\n",
        "# Get all published pipeline objects in the workspace\n",
        "all_pub_pipelines = PublishedPipeline.list(ws)\n",
        "\n",
        "# We will iterate through the list of published pipelines and \n",
        "# use the last ID in the list for Schelue operations: \n",
        "print(\"Published pipelines found in the workspace:\")\n",
        "for pub_pipeline in all_pub_pipelines:\n",
        "    print(\"Name:\", pub_pipeline.name,\"\\tDescription:\", pub_pipeline.description, \"\\tId:\", pub_pipeline.id, \"\\tStatus:\", pub_pipeline.status)\n",
        "    if(pub_pipeline.name == pipeline_name):\n",
        "        published_pipeline = pub_pipeline\n",
        "\n",
        "print(\"Published pipeline id: {}\".format(published_pipeline.id))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Run pipeline through REST calls for other styles\n",
        "\n",
        "# Get AAD token"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core.authentication import InteractiveLoginAuthentication\n",
        "import requests\n",
        "\n",
        "auth = InteractiveLoginAuthentication()\n",
        "aad_token = auth.get_authentication_header()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Get endpoint URL"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "rest_endpoint = published_pipeline.endpoint\n",
        "print(\"Pipeline REST endpoing: {}\".format(rest_endpoint))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Send request and monitor"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "experiment_name = 'styletransfer_parallel_candy'\n",
        "response = requests.post(rest_endpoint, \n",
        "                         headers=aad_token,\n",
        "                         json={\"ExperimentName\": experiment_name,\n",
        "                               \"ParameterAssignments\": {\"style\": \"candy\", \"aml_node_count\": 2}})\n",
        "run_id = response.json()[\"Id\"]\n",
        "\n",
        "from azureml.pipeline.core.run import PipelineRun\n",
        "published_pipeline_run_candy = PipelineRun(ws.experiments[experiment_name], run_id)\n",
        "\n",
        "RunDetails(published_pipeline_run_candy).show()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Download output from re-run"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "published_pipeline_run_candy.wait_for_completion()"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "download_video(published_pipeline_run_candy, target_dir=\"output_video_candy\")"
      ]
    }
  ],
  "metadata": {
    "authors": [
      {
        "name": "sanpil joringer asraniwa pansav tracych"
      }
    ],
    "category": "Other notebooks",
    "compute": [
      "AML Compute"
    ],
    "datasets": [],
    "deployment": [
      "None"
    ],
    "exclude_from_index": true,
    "framework": [
      "None"
    ],
    "friendly_name": "Style transfer using ParallelRunStep",
    "index_order": 1,
    "kernelspec": {
      "display_name": "Python 3.6",
      "language": "python",
      "name": "python36"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.6.9"
    },
    "tags": [
      "Batch Inferencing",
      "Pipeline"
    ],
    "task": "Style transfer"
  },
  "nbformat": 4,
  "nbformat_minor": 2
}